package org.ykx.demo.thread

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.Seconds
import scala.actors.threadpool.ExecutorService
import scala.actors.threadpool.Executors

object SaveHdfsTextFile {

  /*
   * 如果不处理无法匹配的情况时会报错：scala.MatchError: Set(TMR_110d1001) (of class java.lang.String)
   * 解决办法：case _ => 
   * */
  def getFileTypeName(topic: Set[String]): String = {
    val str = topic.mkString
//    println(str+"  "+str.getClass)
    str match {
      case "TMR_1001001" => "ssbm_"
      case "TMR_1003001" => "ydjbm_"
      case "TMR_1101001" => "dldy_"
      case "TMR_1201001" => "glys_"
      case "TMR_1102001" => "xj_"
      case "TMR_1002001" => "dybm"
      case "TMR_1103001" => "yhfh_"
      case _ => ""  
    }

  }

  def main(args: Array[String]): Unit = {
    //创建线程池
    val threadPool: ExecutorService = Executors.newFixedThreadPool(7)
    try {
      //提交5个线程
      for (i <- 1 to 5) {
        //threadPool.submit(new ThreadDemo("thread"+i))
        //        threadPool.execute(new ThreadDemo("thread"+i))
      }
    } finally {
      threadPool.shutdown()
    }
  }

}

//定义线程类
class SaveHdfsTextFile(sc: StreamingContext, kafkaParams: Map[String, String], topic: Set[String]) extends Runnable {
  override def run() {
    val kfkStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](sc, kafkaParams, topic)
    val streamWindow = kfkStream.window(Seconds(30 * 4), Seconds(30 * 4))
    streamWindow.foreachRDD {
      x =>
        val count = x.count()
        println("==========[" + topic + "_RDD COUNT]: " + count)
        if (count > 0) {
          val ssbmRDD = x.map(x => x._2)
          val fileName = SaveHdfsTextFile.getFileTypeName(topic) + ".txt"
          ssbmRDD.saveAsTextFile("hdfs://master:8020/data/" + fileName)

        }
    }
  }
}